Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Conversation

@foxish
Copy link
Member

@foxish foxish commented Apr 10, 2017

This is the initial PR (some integration test fixes & scalastyle fixes pending).
I hope to keep the scope of this minimal and more or less what it is now, with future PRs adding:

  • Authentication/SSL
  • Cleanup of shuffle files
  • Better isolation when using hostPath

cc @apache-spark-on-k8s/contributors

@foxish foxish force-pushed the dynamic-allocation branch from e33dbb9 to d6b3269 Compare April 10, 2017 21:24
Copy link

@ash211 ash211 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realize this is a WIP so left just cursory comments for now

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was not having this ready check a bug before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of the API is that this is supposed to have "fire and forget" semantics, where the executors are requested but there's no guarantee that they are up after they have been requested. The return value of the future is only if the request is acknowledged, not if the executors resolved in starting up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right. I'm trying to understand if we should then launch each pod and wait for it to go running before continuing in this method. Would that make sense? I think the method is called in a way that there aren't several parallel invocations.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so - we're returning a future here so even though we don't execute this method multiple times in parallel we might be executing multiple instances of the future at once. Thus we should probably return immediately after creating the pods without waiting for them to become ready. We can have a separate monitor / watch that handles when the executor pods fail to launch.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

colocatedPods

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executor -> shuffle

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the sbin/ folder usually has just binaries in it, maybe put in conf/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the path on the node, on the shuffle pod, or on the executor pod?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the path on the executor pod. For now, it's up to the user to ensure that it's consistent with that on the node and the shuffle pod.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the rest of Spark uses its local dir instead of the tmpdir for shuffle sans shuffle service?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic is very similar to farther below -- maybe extract out a (Pod => Boolean) predicate function called isPodReady that can be used in both places?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Readiness.isReady() from the Kubernetes client library.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a case class for clarity instead of returning a tuple.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link

@mccheah mccheah Apr 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Option.map instead of match here. Using map and getOrElse we can use a more fluent builder pattern. Perhaps something like this:

val basePodBuilder = new PodBuilder().<default configuration>.endSpec()
val resolvedPodBuilder = shuffleDir.map { dir =>
  basePodBuilder.editSpec()
    .setVolumes(...)
    .editContainerAt(0).<edits>.endContainer() // Can also use editMatchingContainer()
    .endSpec()
}.getOrElse(basePodBuilder)
kubernetesClient.pods().create(resolvedPod.build())

We could also build the container separately from the pod, but I'm not sure how much that buys us. That would look something like this:

val baseContainerBuilder = new ContainerBuilder().<default configurations>
val basePodBuilder = new PodBuilder().<default configuration>.endSpec()
val resolvedPodBuilder = shuffleDir.map { dir =>
   basePodBuilder.editSpec()
     .setVolumes(...)
     .endSpec()
 }.getOrElse(basePodBuilder)
 val resolvedContainerBuilder = shuffleDir.map { dir =>
   baseContainerBuilder.addToVolumeMounts(...)
 }.getOrElse(baseContainerBuilder)
 val resolvedPod = resolvedPodBuilder().editSpec()
   .addContainer(resolvedContainerBuilder.build())
   .endSpec()
   .build()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this command be running the shuffle service? Use bin/spark-class to help here if that's the case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format seems off

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the reason of modifying the RetrieveSparkAppConfig is just to get the executorId? You should already get the executorId with RegisterExecutor message.
My 2cents it will be better to have as little modifications possible in Spark core.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is that the driver needs to send slightly different config to each executor - each executor gets a different shuffle service pod IP. The driver needs to differentiate between the each executor when it's fetching config. We can do this at the time of the RegisterExecutor message but would need to extend and override parts of CoarseGrainedExecutorBackend.scala. I suspect the changes will be more complex if we do that.

@ash211
Copy link

ash211 commented Apr 14, 2017

I took a look at the local storage proposal and I think it helps us with allocating local disk space for shuffle and managing that capping its amount, but I didn't see any mention of the multiple-mounts scenario that the proposal doc mentioned. During a shuffle the executor pods will have a volume mounted that it's writing to, and simultaneously the shuffle service will be serving up files from the volume to consumers on other nodes. Most likely the two volume mounts will overlap, with the executor pod's volume in a subfolder of the shuffle service pod's volume in the node.

@foxish foxish force-pushed the dynamic-allocation branch from 4a007e2 to 6742ab2 Compare April 18, 2017 00:06
@foxish foxish changed the title Dynamic Executor Scaling: Initial [WIP] Dynamic Executor Scaling Apr 18, 2017
@foxish
Copy link
Member Author

foxish commented Apr 18, 2017

@tnachen @ash211 @mccheah Addressed comments. PTAL

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a official image that we'll be supporting?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to @tnachen's question. Maybe add a comment pointing to the Dockerfile below?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a new configuration you are introducing? We will need documentation for this for sure to know what is this for.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also please use the ConfigBuilder from config.scala and its .internal() marker if this is not meant to be set by users and is only used internally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ash211 do you think we should add it into spark-core alongside spark.shuffle.service.port that already exists there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the configbuilder for references to spark.shuffle.service.host within the Kubernetes package; and left it as it is here, conforming to the surrounding code in spark-core.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the formatting, I don't think this typical Spark style (arg on each line and 4 spaces indented)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug only?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like setting it lets you override the IP that the executor thinks its colocated shuffle service is on -- should put that in the doc

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should move this back to match Spark style

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto Style

@ash211
Copy link

ash211 commented Apr 19, 2017

rerun integration test please

Copy link
Member

@kimoonkim kimoonkim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only took a cursory look at the main code. But it seems pretty good as is. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/tmp might not be ideal in many setups. e.g. It may be mounting small-size memfs not large enough for large shuffle data. It might be better to use a unique dir here like spark-shuffle-dir and document that people should prepare the dir if necessary.

Eventually we should perhaps make this configurable by turning this into a helm chart?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it would be a helm chart since it doesn't deploy an entire application; just a dependent component (and the whole overhead of helm/tiller would be prohibitive). This is only a sample YAML that a user might use. I was thinking of distributing a YAML file that the user may modify according to their needs.

Copy link
Member

@kimoonkim kimoonkim Apr 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand your points.

it doesn't deploy an entire application

Helm can be used for deploying a service. I consider the spark shuffle service a service independent of individual spark jobs.

Or, by "an entire application", do you mean the fact that the docker image may not be what the user wants?

the whole overhead of helm/tiller would be prohibitive

By "overhead", do you mean the "setup steps" such as downloading helm and doing $ helm init? Or do you mean the "runtime" cpu, etc overhead?

If you mean "setup steps", it may not be a lot compared with the alternative of modifying this yaml file on her own and doing $ kubectl create. At least that was my experience. Another thing to consider is mucking with yaml can easily lead to syntax errors, and troubleshooting hours. Helm chart is a black box shielding inexperienced users from that.

Anyway, I wouldn't block this PR for not having helm. I just think it is nice way to go for the future iterations of this.

Independent of helm or not, I still think putting /tmp here is a not a good example.

One more related point is that we may have multiple dirs in this as an example. Shuffle is seek intensive. Using multiple disks is quite common. We may want to show how it can be done as an example.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By "overhead", do you mean the "setup steps" such as downloading helm and doing $ helm init? Or do you mean the "runtime" cpu, etc overhead?

Yes. Helm is one package manager, but not necessarily installed on most clusters. It would be excessive to expect installations of helm to run a sub-component of Spark in my opinion.

Independent of helm or not, I still think putting /tmp here is a not a good example.

Fair enough, I'll include a better example, of multiple directories then. What do you think would be a good default shuffle directory on hosts? /shuffle-files or something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. You're trying to lower the adoption barrier.

I would think /spark-shuffle-files-0, /spark-shffule-files-1, etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating with the result of the SIG meeting discussion. We agreed /var/tmp would be good default base dir for this. So maybe /var/tmp/spark-shuffle-files-0, /var/tmp/spark-shuffle-files-1, etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to @tnachen's question. Maybe add a comment pointing to the Dockerfile below?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Undo the formatting change here - but IDEs don't handle this well unfortunately.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the dot at the end of line 312 to this line

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the Client class directly as using spark-submit manipulates system properties

@mccheah
Copy link

mccheah commented Apr 20, 2017

I wonder what more we can do for testing here. It would be nice for example to verify that reads and writes are happening on the pods at the disk level and that the shuffle service pod can see what is being written by the executor pod. Have we also confirmed that the test job reads and writes shuffle files? More testing at the disk level will allow us to verify changes we make whenconsidering e.g. isolation.

We also should consider testing KubernetesClusterSchedulerBackend at the unit level. The V2 submission revamp was an opportunity to make our testing more rigorous at the unit level, particularly in #227. Can we consider if we can do something similar for this change?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also please use the ConfigBuilder from config.scala and its .internal() marker if this is not meant to be set by users and is only used internally.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect this to be set by the user, so remove .internal()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove .internal()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove .internal()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when launching say 100 executors at the start of a job and each executor is registering with the driver, this will trigger 100 near-simultaneous requests to the apiserver for running shuffle service pods.

Does it make sense instead to keep a list locally that's updated by a Watch, and to refer to that cached local list instead of querying apiserver each time?

Not sure the impact of this level of query load on apiserver.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this idea.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. I'll add this caching in

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like setting it lets you override the IP that the executor thinks its colocated shuffle service is on -- should put that in the doc

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like colocatedPods.map(_.getStatus.getPodIP).mkString(",")

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feels like not quite the right match expression here -- how about just _ ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah how to match against a 1-item list here? Not sure if it's a Seq or something else at this point

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the nice? should we do this in the other Dockerfiles too?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought 0 is the default?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With #227 merged, each component no longer needs to have its own assembly XML file.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: src/main/docker/shuffle -> src/main/docker/shuffle-service for a tad more clarity.

@foxish foxish force-pushed the dynamic-allocation branch from b0751cb to 061bc92 Compare April 28, 2017 08:23
}
totalExpectedExecutors.set(requestedTotal)

allPodsReady
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC here doRequestTotalExecutors should return true unless it fails to send the scaling request to the cluster manager. The return value has nothing to do with whether the executors are ready or not.

Both yarn and mesos's backend implementation do something like that.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should not call k8s api directly inside doRequestTotalExecutors, because the returned Future is waited by the base class's requestExecutors method with a timeout. The current code could block the dynamic allocation manager thread for a long time, e.g. when there is a intermittent network problem, or cause it to exit if there is any exception thrown when calling k8s api. We should use a worker thread to do it, wrap any possible k8s exception, and return a real Future<Boolean> here.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

K8s API calls are asynchronous in and of themselves, as long as we're not specifically watching the resources for their readiness. I think it's actually semantically correct to try to request the executors here from the API server. If there's a network timeout or a failure then the state of the Future should reflect as such. YarnSchedulerBackend does this as well, in that it queries the application master with a message indicating the target number of executors that is desired. In client mode this could be a remote call.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one difference here I think. Kubernetes will not refuse to accept new pods even if there are no resources, and they'll just go pending. We need some mechanism in between to ensure we do not request more executors than can be accommodated at the current time.

Copy link

@mccheah mccheah May 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also the case in YARN. The request to the application master only sets the desired number of executors: (see this method). The application master will then continuously attempt to fetch these resources after the initial request in its allocator thread.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thus even if the cluster does not actually have the sufficient resources available, true is still returned by doRequestTotalExecutors in YARN mode.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I didn't realized the body of doRequestTotalExecutors is actually running in a thread pool provided by the implicit val requestExecutorContext executor.

But the other question still holds: we should return true unless it fails to to contact k8s api.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed some of the logic and added a separate KubernetesAllocator that is in charge of performing actual allocations. It's still WIP but should explain the direction I'm thinking of. @mccheah @lins05 PTAL

Copy link
Member Author

@foxish foxish May 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last commit is the interesting one and the 10 seconds between attempted allocations is just an example. I expect that we'll let the user configure (view some new spark option) how often to try and allocate new executor pods.

@foxish foxish force-pushed the dynamic-allocation branch 2 times, most recently from 8db4287 to f342055 Compare May 3, 2017 15:49
@foxish foxish force-pushed the dynamic-allocation branch from f342055 to 5308f17 Compare May 3, 2017 15:50
interval: Int,
backend: KubernetesClusterSchedulerBackend) extends Logging {

private val scheduler = Executors.newScheduledThreadPool(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering, if this should be a daemon thread. If yes, then use ThreadUtils from Spark packages to create a "named" daemon thread.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 use a daemon thread and use ThreadUtils.

backend: KubernetesClusterSchedulerBackend) extends Logging {

private val scheduler = Executors.newScheduledThreadPool(1)
private val allocatorRunnable: Runnable = new Runnable {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of using Java Runnable and specifically submitting the task, consider using Scala's ExecutionContext and Future blocks.

// This is a set of all ready executor pods.
private var readyExecutorPods = Set[String]()

client
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Start the watch in start() and keep a reference to it. Close the watch in stop().

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed this isn't part of the scheduler backend so such methods like start() and stop() aren't defined. But we should be starting and stopping the watch in KubernetesClusterSchedulerBackend#start() and KubernetesClusterSchedulerBackend#stop() analogously.

* KubernetesAllocator class watches all the executor pods associated with
* this SparkJob and creates new executors when it is appropriate.
*/
private[spark] class KubernetesAllocator(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be pulled out to a separate class.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, meant separate file.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually maybe it doesn't belong in a separate class at all - the reference to a KubernetesClusterSchedulerBackend in this class makes me think this logic actually belongs in the scheduler backend itself.

@foxish
Copy link
Member Author

foxish commented May 3, 2017

rerun integration test please

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a separate executor limit here, right? Can we not use the expected number of executors, and then block on the executors we've specifically requested in this round to be ready? I think it's less clear if we have to track more global state in having both an executor limit and an expected executor count.

@foxish foxish force-pushed the dynamic-allocation branch 2 times, most recently from af50dc0 to 39572cf Compare May 14, 2017 07:35
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants